package com.tpvlog.seckill.inventory;

import com.alibaba.fastjson.JSONObject;
import com.tpvlog.seckill.common.JedisManager;
import com.tpvlog.seckill.common.RedisCluster;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import java.util.List;

@Component
public class BootListener implements CommandLineRunner {

    public void run(String... strings) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "seckill-inventory-consumer-group");
        consumer.setNamesrvAddr("localhost:9876");

        consumer.subscribe("seckill_product_added_topic", "*");
        consumer.subscribe("order_pay_result_inform", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts,
                                                            ConsumeConcurrentlyContext context) {
                for(MessageExt messageExt : messageExts) {
                    System.out.println(new String(messageExt.getBody()));

                    String topic = messageExt.getTopic();

                    if(topic.equals("seckill_product_added_topic")) {
                        // 得知，秒杀场次里增加的是哪个商品
                        String seckillProductJSON = new String(messageExt.getBody());
                        JSONObject seckillProductJSONObject = JSONObject.parseObject(seckillProductJSON);

                        // 调用库存中心提供的接口，冻结商品用于秒杀活动的库存
                        Long productId = seckillProductJSONObject.getLong("productId");
                        Double seckillPrice = seckillProductJSONObject.getDouble("seckillPrice");
                        Long seckillStock = seckillProductJSONObject.getLong("seckillStock");

                        JedisManager jedisManager = JedisManager.getInstance();
                        Jedis jedis = jedisManager.getJedis();
                        String inventoryInitedFlag = jedis.get("seckill::product-inventory-inited::flag::" + productId);

                        if(inventoryInitedFlag != null && inventoryInitedFlag.equals("inited")) {
                            continue;
                        }
                        jedis.set("seckill::product-inventory-inited::flag::" + productId, "inited");

                        System.out.println("调用库存中心提供的接口，冻结商品用于秒杀活动的库存");
                        // 库存中心：可售库存、锁定库存、已售库存、冻结库存

                        // 把秒杀商品的库存进行分片，放在redis各个服务器上去
                        RedisCluster redisCluster = RedisCluster.getInstance();
                        redisCluster.initSeckillProductStock(productId,seckillStock);
                    }

                    else if(topic.equals("order_pay_result_inform")) {
                        // 解析订单支付结果的通知
                        JSONObject orderPayResult = JSONObject.parseObject(new String(messageExt.getBody()));
                        Long userId = orderPayResult.getLong("userId");
                        Long productId = orderPayResult.getLong("productId");
                        Boolean orderPaySuccess = orderPayResult.getInteger("orderPaySuccess") == 1 ? true : false;

                        // 幂等性保障
                        JedisManager jedisManager = JedisManager.getInstance();
                        Jedis jedis = jedisManager.getJedis();
                        String orderPayResultProcessedFlag = jedis.get("seckill::order-pay-result-processed::flag::" + userId + "::" + productId);

                        if(orderPayResultProcessedFlag != null && orderPayResultProcessedFlag.equals("processed")) {
                            continue;
                        }
                        jedis.set("seckill::order-pay-result-processed::flag::" + userId + "::" + productId, "processed");

                        // 获取当时秒杀成功时的库存分片所在redis节点
                        String stockShardRedisNode = jedis.get("flash_sale::stock_shard::" + userId + "::" + productId);

                        RedisCluster redisCluster = RedisCluster.getInstance();

                        // 如果说秒杀订单支付成功了
                        if(orderPaySuccess) {
                            redisCluster.flashSaleOrderPaySuccess(stockShardRedisNode, productId);
                        }
                        // 如果说秒杀订单支付失败或者取消了
                        else {
                            redisCluster.flashSaleOrderPayFail(stockShardRedisNode, productId);
                        }

                        System.out.println("秒杀抢购商品的订单支付结果处理成功......");
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动......");
    }

}
